package com.atguigu.app.dwd;

import com.atguigu.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

public class DwdTradePayDetailSuc {


    //1 定义来源1 topic_db
    //2  定义来源2  dwd_trade_order_detail
    //3  从来源1提取出 支付成功的数据
    //4  定义来源3  字典表 base_dic
    //5  把三个表join起来
    //6  定义目标表
    //7  输出目标表
    public static void main(String[] args) {


        //0  环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.getConfig().setIdleStateRetention(Duration.ofMillis(15*60000));

        //1 定义来源1 topic_db
        String createTopicDBSQL="  CREATE TABLE topic_db (\n" +
                "         `database`    STRING,\n" +
                "          `table` STRING,\n" +
                "         `type` STRING,\n" +
                "           `ts` STRING,\n" +
                "           `data` MAP<STRING,STRING>,\n" +
                "           `old` MAP<STRING,STRING>,\n" +
                "          `proc_time` as proctime(),\n" +
                "           row_time as TO_TIMESTAMP(FROM_UNIXTIME(cast(ts as bigint))),\n" +
                "            watermark for row_time as row_time" +
        "         ) WITH (\n" +
                "            'connector' = 'kafka',\n" +
                "          'topic' = 'topic_db',\n" +
                "           'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "           'properties.group.id' = 'dwd_trade_cart_add',\n" +
                "           'scan.startup.mode' = 'group-offsets',\n" +
                "           'format' = 'json'\n" +
                "         )";
        tableEnv.executeSql(createTopicDBSQL);


        //2  定义来源2  dwd_trade_order_detail
        String createTradeOrderDetailSQL="create table dwd_trade_order_detail(" +
                "id string,\n" +
                "order_id string,\n" +
                "user_id string,\n" +
                "sku_id string,\n" +
                "sku_name string,\n" +
                "province_id string,\n" +
                "activity_id string,\n" +
                "activity_rule_id string,\n" +
                "coupon_id string,\n" +
                "date_id string,\n" +
                "create_time string,\n" +
                "source_id string,\n" +
                "source_type string,\n" +
                "source_type_name string,\n" +
                "sku_num string,\n" +
                "split_original_amount string,\n" +
                "split_activity_amount string,\n" +
                "split_coupon_amount string,\n" +
                "split_total_amount string,\n" +
                "ts string,\n" +
                "   row_time as TO_TIMESTAMP(FROM_UNIXTIME(cast(ts as bigint))),\n" +
                "    watermark for row_time as row_time" +
                " )" +
                KafkaUtil.getKafkaDDL("dwd_trade_order_detail","trade_pay_suc");
        tableEnv.executeSql(createTradeOrderDetailSQL);


        //3  从来源1提取出 支付成功的数据
        // select
        // *  from topic_db
        // where `table`='payment_info'
        // and `type`='update'
        // and `data`['payment_status']='1602'
        String paymentInfoSQL="        select\n" +
                "data['user_id'] user_id,\n" +
                "data['order_id'] order_id,\n" +
                "data['payment_type'] payment_type,\n" +
                "data['callback_time'] callback_time,\n" +
                "row_time,\n" +   //用于interval join
                "`proc_time`,\n" +  // 用于 lookup join
                "ts\n" +
                " from topic_db \n" +
                "         where `table`='payment_info' \n" +
                "          and `type`='update' \n" +
                "          and `data`['payment_status']='1602'";
        tableEnv.createTemporaryView("payment_info",tableEnv.sqlQuery(paymentInfoSQL));

        //4  定义来源3  字典表 base_dic
        String createLookupTableSQL="  CREATE TABLE base_dic (\n" +
                "          dic_code STRING,\n" +
                "          dic_name STRING,\n" +
                "          PRIMARY KEY (dic_code) NOT ENFORCED\n" +
                "        ) WITH (\n" +
                "           'connector' = 'jdbc',\n" +
                "           'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "           'url' = 'jdbc:mysql://hadoop102:3306/gmall',\n" +
                "           'table-name' = 'base_dic',\n" +
                "          'username' = 'root',\n" +
                "          'password' = '000000',\n" +
                "          'lookup.cache.max-rows' = '1000',\n" +
                "          'lookup.cache.ttl' = '60s'\n" +
                "         ) ";
        tableEnv.executeSql(createLookupTableSQL);

        //5  把三个表join起来

        //官方参考样例:
        //SELECT *
        //FROM Orders o, Shipments s
        //WHERE o.id = s.order_id
        //AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

        //  select
        // *
        //  from dwd_trade_order_detail od ,  payment_info pi
        //   join  base_dic FOR SYSTEM_TIME AS OF pi.proc_time as bd  on  bd.dic_code=pi.payment_type
        //  where od.order_id =pi.order_id
        // and od.row_time BETWEEN pi.row_time - INTERVAL '15' MINUTE and  pi.row_time + INTERVAL '15' SECOND


        String  joinedPaySucSQL="    select\n" +
                "od.id order_detail_id,\n" +
                "od.order_id,\n" +
                "od.user_id,\n" +
                "od.sku_id,\n" +
                "od.sku_name,\n" +
                "od.province_id,\n" +
                "od.activity_id,\n" +
                "od.activity_rule_id,\n" +
                "od.coupon_id,\n" +
                "pi.payment_type ,\n" +
                "bd.dic_name as payment_type_name,\n" +
                "pi.callback_time,\n" +
                "od.source_id,\n" +
                "od.source_type source_type_code,\n" +
                "od.source_type_name,\n" +
                "od.sku_num,\n" +
                "od.split_original_amount,\n" +
                "od.split_activity_amount,\n" +
                "od.split_coupon_amount,\n" +
                "od.split_total_amount split_payment_amount,\n" +
                "pi.ts\n"+
        "      from dwd_trade_order_detail od ,  payment_info pi\n" +
                "         join  base_dic FOR SYSTEM_TIME AS OF pi.proc_time as bd  on  bd.dic_code=pi.payment_type\n" +
                "        where od.order_id =pi.order_id\n" +
                "        and od.row_time BETWEEN pi.row_time - INTERVAL '15' MINUTE and  pi.row_time + INTERVAL '15' SECOND\n ";


         tableEnv.executeSql(joinedPaySucSQL).print(); ;

        //6  定义目标表
        //7  输出目标表


    }
}
